// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "data_queue.h"

#include <glog/logging.h>

#include <algorithm>
#include <mutex>
#include <utility>

#include "pipeline/dependency.h"
#include "vec/core/block.h"

namespace doris {
namespace pipeline {
#include "common/compile_check_begin.h"
DataQueue::DataQueue(int child_count)
        : _queue_blocks_lock(child_count),
          _queue_blocks(child_count),
          _free_blocks_lock(child_count),
          _free_blocks(child_count),
          _child_count(child_count),
          _is_finished(child_count),
          _is_canceled(child_count),
          _cur_bytes_in_queue(child_count),
          _cur_blocks_nums_in_queue(child_count),
          _flag_queue_idx(0) {
    for (int i = 0; i < child_count; ++i) {
        _queue_blocks_lock[i].reset(new std::mutex());
        _free_blocks_lock[i].reset(new std::mutex());
        _is_finished[i] = false;
        _is_canceled[i] = false;
        _cur_bytes_in_queue[i] = 0;
        _cur_blocks_nums_in_queue[i] = 0;
    }
    _un_finished_counter = child_count;
    _sink_dependencies.resize(child_count, nullptr);
}

std::unique_ptr<vectorized::Block> DataQueue::get_free_block(int child_idx) {
    {
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]));
        if (!_free_blocks[child_idx].empty()) {
            auto block = std::move(_free_blocks[child_idx].front());
            _free_blocks[child_idx].pop_front();
            return block;
        }
    }

    return vectorized::Block::create_unique();
}

void DataQueue::push_free_block(std::unique_ptr<vectorized::Block> block, int child_idx) {
    DCHECK(block->rows() == 0);

    if (!_is_low_memory_mode) {
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]));
        _free_blocks[child_idx].emplace_back(std::move(block));
    }
}

void DataQueue::clear_free_blocks() {
    for (size_t child_idx = 0; child_idx < _free_blocks.size(); ++child_idx) {
        std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]);
        std::deque<std::unique_ptr<vectorized::Block>> tmp_queue;
        _free_blocks[child_idx].swap(tmp_queue);
    }
}

void DataQueue::terminate() {
    for (int i = 0; i < _queue_blocks.size(); i++) {
        set_finish(i);
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[i]));
        if (_cur_blocks_nums_in_queue[i] > 0) {
            _queue_blocks[i].clear();
            _cur_bytes_in_queue[i] = 0;
            _cur_blocks_nums_in_queue[i] = 0;
            _sink_dependencies[i]->set_always_ready();
        }
    }
    clear_free_blocks();
}

//check which queue have data, and save the idx in _flag_queue_idx,
//so next loop, will check the record idx + 1 first
//maybe it's useful with many queue, others maybe always 0
bool DataQueue::remaining_has_data() {
    int count = _child_count;
    while (--count >= 0) {
        _flag_queue_idx++;
        if (_flag_queue_idx == _child_count) {
            _flag_queue_idx = 0;
        }
        if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
            return true;
        }
    }
    return false;
}

//the _flag_queue_idx indicate which queue has data, and in check can_read
//will be set idx in remaining_has_data function
Status DataQueue::get_block_from_queue(std::unique_ptr<vectorized::Block>* output_block,
                                       int* child_idx) {
    if (_is_canceled[_flag_queue_idx]) {
        return Status::InternalError("Current queue of idx {} have beed canceled: ",
                                     _flag_queue_idx);
    }

    {
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[_flag_queue_idx]));
        if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
            *output_block = std::move(_queue_blocks[_flag_queue_idx].front());
            _queue_blocks[_flag_queue_idx].pop_front();
            if (child_idx) {
                *child_idx = _flag_queue_idx;
            }
            _cur_bytes_in_queue[_flag_queue_idx] -= (*output_block)->allocated_bytes();
            _cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
            if (_cur_blocks_nums_in_queue[_flag_queue_idx] == 0) {
                _sink_dependencies[_flag_queue_idx]->set_ready();
            }
            auto old_value = _cur_blocks_total_nums.fetch_sub(1);
            if (old_value == 1 && _source_dependency) {
                set_source_block();
            }
        }
    }
    return Status::OK();
}

Status DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_idx) {
    if (!block) {
        return Status::OK();
    }
    {
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]));
        if (_is_finished[child_idx]) {
            return Status::EndOfFile("Already finish");
        }
        _cur_bytes_in_queue[child_idx] += block->allocated_bytes();
        _queue_blocks[child_idx].emplace_back(std::move(block));
        _cur_blocks_nums_in_queue[child_idx] += 1;

        if (_cur_blocks_nums_in_queue[child_idx] > _max_blocks_in_sub_queue) {
            _sink_dependencies[child_idx]->block();
        }
        _cur_blocks_total_nums++;

        set_source_ready();
    }
    return Status::OK();
}

void DataQueue::set_finish(int child_idx) {
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]));
    if (_is_finished[child_idx]) {
        return;
    }
    _is_finished[child_idx] = true;
    if (_un_finished_counter.fetch_sub(1) == 1) {
        _is_all_finished = true;
    }
    set_source_ready();
}

void DataQueue::set_canceled(int child_idx) {
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]));
    DCHECK(!_is_finished[child_idx]);
    _is_canceled[child_idx] = true;
    _is_finished[child_idx] = true;
    if (_un_finished_counter.fetch_sub(1) == 1) {
        _is_all_finished = true;
    }
    set_source_ready();
}

bool DataQueue::is_finish(int child_idx) {
    return _is_finished[child_idx];
}

bool DataQueue::is_all_finish() {
    return _is_all_finished;
}

void DataQueue::set_source_ready() {
    if (_source_dependency) {
        std::unique_lock lc(_source_lock);
        _source_dependency->set_ready();
    }
}

void DataQueue::set_source_block() {
    if (_cur_blocks_total_nums == 0 && !is_all_finish()) {
        std::unique_lock lc(_source_lock);
        // Performing the judgment twice, attempting to avoid blocking the source as much as possible.
        if (_cur_blocks_total_nums == 0 && !is_all_finish()) {
            _source_dependency->block();
        }
    }
}

} // namespace pipeline
} // namespace doris
